package com.yycx.common.thread.impl;


import com.yycx.common.base.entity.EntityMap;
import com.yycx.common.base.utils.FlymeUtils;
import com.yycx.common.thread.IThreadPoolExecutorService;
import com.yycx.common.thread.ThreadHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 线程池实现类
 *
 * @author zyf
 */

@Service
@Slf4j
public class ThreadPoolExecutorService implements IThreadPoolExecutorService {
    private ThreadPoolExecutor poolExecutor;
    private ThreadPoolExecutor singleExecutor;
    private Logger logger = LoggerFactory.getLogger(ThreadPoolExecutorService.class);

    public ThreadPoolExecutorService() {
        LinkedBlockingDeque<Runnable> deque = new LinkedBlockingDeque<Runnable>();
        poolExecutor = new ThreadPoolExecutor(4, 4, 5, TimeUnit.SECONDS, deque);
        poolExecutor.allowCoreThreadTimeOut(true);
        singleExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, deque);
        singleExecutor.allowCoreThreadTimeOut(true);
    }

    @Override
    public void singleExecute(Runnable runnable) {

        singleExecutor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            runnable.run();
        });
    }

    @Override
    public void execute(Runnable runnable) {
        poolExecutor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            runnable.run();
        });
    }

    @PreDestroy
    public void destroy() {
        try {
            poolExecutor.getQueue().clear();
            poolExecutor.shutdown();
            poolExecutor.awaitTermination(10, TimeUnit.SECONDS);
            singleExecutor.getQueue().clear();
            singleExecutor.shutdown();
            singleExecutor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("销毁线程池", e);
        }
        logger.info("销毁线程池");
    }

    private <T> List<Map<String, Object>> handler(List<T> list, Integer count, boolean executeType, ThreadHandler threadHandler) {
        List<Map<String, Object>> result = new ArrayList<>();
        // 开始时间
        long start = System.currentTimeMillis();
        int dataSize = list.size();

        // 每一条线程处理多少条数据
        int threadSize = 1;
        int threadNum = 1;

        //动态线程数方式
        if (executeType) {
            // 每count条数据开启一条线程
            threadSize = count;
            //计算线程数
            threadNum = dataSize / threadSize + 1;
        } else {
            threadNum = count == 1 ? 2 : count;
            // 计算线程处理量
            threadSize = dataSize / (threadNum);
            if (threadSize == 0) {
                threadSize = 1;
                threadNum = dataSize;
            }

        }

        // 创建一个线程池
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        // 定义一个任务集合
        List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<Callable<List<Map<String, Object>>>>();
        Callable<List<Map<String, Object>>> task = null;
        List<T> cutList = null;

        // 确定每条线程的数据
        for (int i = 0; i < threadNum; i++) {
            if (i == threadNum - 1) {
                log.info("线程：" + i + "数据：" + threadSize * i + "--" + dataSize);
                cutList = list.subList(threadSize * i, dataSize);
            } else {
                log.info("线程：" + i + "数据：" + threadSize * i + "--" + threadSize * (i + 1));
                cutList = list.subList(threadSize * i, threadSize * (i + 1));
            }

            final List<T> data = cutList;
            int threadNo = i;
            task = () -> {
                String threadName = Thread.currentThread().getName();
                if (FlymeUtils.isNotEmpty(data)) {
                    return threadHandler.handler(threadName, threadNo, data);
                }
                return null;
            };
            // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
            tasks.add(task);
        }

        try {
            List<Future<List<Map<String, Object>>>> results = exec.invokeAll(tasks);
            exec.shutdown();
            for (Future<List<Map<String, Object>>> future : results) {
                List<Map<String, Object>>obj=future.get();
                if(FlymeUtils.isNotEmpty(obj)){
                    result.addAll(obj);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("任务结束,执行任务消耗了 ：" + (System.currentTimeMillis() - start) + "毫秒");
        return result;
    }

    @Override
    public <T> List<EntityMap> handlerListUseFutureTask(List<T> list, Integer count, ThreadHandler threadHandler) {
        List<EntityMap> result = new ArrayList<>();
        //根据文件的长度设置线程池
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        //每一个任务的list
        List<FutureTask<EntityMap>> taskList = new ArrayList<>();
        for (int i = 0; i < list.size(); i++) {
            //吧每一个文件放入异步任务中 去执行
            int threadNo = i;
            Object file = list.get(i);
            FutureTask<EntityMap> task = new FutureTask<EntityMap>((Callable) () -> {
                String threadName = Thread.currentThread().getName();
                return threadHandler.handlerObj(threadName, threadNo, file);
            });
            executorService.execute(task);
            //把每一个任务的返回信息放进入
            taskList.add(task);
        }
        executorService.shutdown();
        for (FutureTask<EntityMap> item : taskList) {
            try {
                EntityMap map = item.get();
                if (FlymeUtils.isNotEmpty(map)) {
                    result.add(map);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return result;
    }


    @Override
    public <T> List<EntityMap> handlerList(List<T> list, Integer count, ThreadHandler threadHandler) {
        List<EntityMap> result = new ArrayList<>();
        //根据文件的长度设置线程池
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        // 定义一个任务集合
        List<Callable<EntityMap>> tasks = new LinkedList<>();
        List<Future<EntityMap>> futureList;
        for (int i = 0; i < list.size(); i++) {
            //把每一个对象放入异步任务中去执行
            int threadNo = i;
            T obj = list.get(i);
            Callable<EntityMap> task = () -> {
                String threadName = Thread.currentThread().getName();
                if (FlymeUtils.isNotEmpty(obj)) {
                    //任务回调
                    return threadHandler.handlerObj(threadName, threadNo, obj);
                }
                return null;
            };
            //把每一个任务的返回信息放进入
            tasks.add(task);
        }
        try {
            futureList = executorService.invokeAll(tasks);
            executorService.shutdown();
            if(FlymeUtils.isNotEmpty(futureList)) {
                for (Future<EntityMap> item : futureList) {
                    try{
                        EntityMap map = item.get();
                        if (FlymeUtils.isNotEmpty(map)) {
                            //放入返回结果
                            result.add(map);
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }

                }
            }
        } catch (Exception e) {
        }
        return result;
    }

    @Override
    public <T> void submitList(List<T> list, Integer count, ThreadHandler threadHandler) {
        List<EntityMap> result = new ArrayList<>();
        //根据文件的长度设置线程池
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        for (int i = 0; i < list.size(); i++) {
            //把每一个对象放入异步任务中去执行
            int threadNo = i;
            T obj = list.get(i);
            Callable<EntityMap> task = () -> {
                String threadName = Thread.currentThread().getName();
                if (FlymeUtils.isNotEmpty(obj)) {
                    //任务回调
                    return threadHandler.handlerObj(threadName, threadNo, obj);
                }
                return null;
            };
            FutureTask<EntityMap> futureTask = new FutureTask<>(task);
            executorService.execute(futureTask);
        }
        executorService.shutdown();
    }


    /**
     * 开启threadNum个线程处理数据
     *
     * @param data
     * @param threadNum
     * @param threadHandler
     */
    @Override
    public <T> void handlerByThreadNum(List<T> data, Integer threadNum, ThreadHandler threadHandler) {
        handler(data, threadNum, false, threadHandler);
    }

    /**
     * 每threadSize条数据开启一条线程
     *
     * @param data
     * @param threadSize
     * @param threadHandler
     */
    @Override
    public <T> void handlerByThreadSize(List<T> data, Integer threadSize, ThreadHandler threadHandler) {
        handler(data, threadSize, true, threadHandler);
    }

}
